package com.markhsiu.minimq.remote.transport.netty;

import java.util.concurrent.ThreadFactory;

import com.markhsiu.minimq.core.constant.ConstantUtil;
import com.markhsiu.minimq.core.exeption.MiniMQException;
import com.markhsiu.minimq.core.thread.ThreadFactoryBuilder;
import com.markhsiu.minimq.remote.AbstractServer;
import com.markhsiu.minimq.remote.Address;
import com.markhsiu.minimq.remote.transport.netty.handler.NettySeverHandler;
import com.markhsiu.minimq.serialize.protostuff.ProtostuffCodec;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;

/**
 * netty通信服务端
 * Created by Mark Hsiu on 2017/2/8.
 */
public class NettyServer extends AbstractServer{
	private int THREADS_BOSS = 1;
	private int THREADS = ConstantUtil.getWorkerThreads();
	private ThreadFactory serverBossTF = new ThreadFactoryBuilder().setNameFormat("server-boss").build();
	private ThreadFactory serverWorkerTF = new ThreadFactoryBuilder().setNameFormat("server-woker").build();
    private EventLoopGroup boss = new NioEventLoopGroup(THREADS_BOSS, serverBossTF);
    private EventLoopGroup worker = new NioEventLoopGroup(THREADS, serverWorkerTF);
	private static final ThreadFactory workerThreadFactory 
			= new ThreadFactoryBuilder().setNameFormat("server-default").build();
	private DefaultEventExecutorGroup defaultEventExecutorGroup 
	= new DefaultEventExecutorGroup(THREADS, workerThreadFactory);
	
    public  NettyServer(Address addr){
        if(addr == null){
            throw new MiniMQException(" address is null");
        }
        super.addr = addr;
    }


    public NettyServer() {}


	@Override
    public void open() {
    	
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(boss,worker)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ConstantUtil.TIMEOUT)
                .option(ChannelOption.SO_TIMEOUT, 6000)
                .option(ChannelOption.SO_BACKLOG,1024)    
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_SNDBUF, ConstantUtil.socketSndBufSize)
                .option(ChannelOption.SO_RCVBUF, ConstantUtil.socketRcvBufSize)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChildChannelHandler());

        try {
            ChannelFuture future = bootstrap.bind(addr.getPort()).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 
    }

    @Override
    public void close() {
        worker.shutdownGracefully();
        boss.shutdownGracefully();
    }


    private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{

        @Override
        protected void initChannel(SocketChannel channel) throws Exception {
            ChannelPipeline pip = channel.pipeline();
            // 1.
            // 负责实现Serializable的POJO对象进行解码，它有多个构造函数，支持不同的ClassResolver
            // 2.
            // 在此我们使用weakCachingConcurentResolver创建线程安全的WeakReferenceMap对象类加载器
            // 进行缓存，它支持多线程并发访问，当虚拟机内存不足时，会释放缓存中的内存，防止内存泄漏
            // 3.
            // 为了防止异常码流和解码错位导致的内存溢出，这里将单个对象最大序列化后的字节数组长度设置为1M 
//            pip.addLast(new ObjectDecoder(1024 * 1024,
//                    ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
//            pip.addLast(new ObjectEncoder());
        	pip.addLast(defaultEventExecutorGroup);
            pip.addLast(new MessageObjectEncoder(ProtostuffCodec.OneInstance()));
			pip.addLast(new MessageObjectDecoder(ProtostuffCodec.OneInstance()));
            pip.addLast("timeout",new IdleStateHandler(0, 0, 120));
            pip.addLast(new NettySeverHandler(handlers));
        }
    }

}
